package cn.hg.solon.youcan.job.utils;

import org.dromara.hutool.core.data.id.IdUtil;
import org.dromara.hutool.core.exception.ExceptionUtil;
import org.dromara.hutool.core.text.StrValidator;
import org.dromara.hutool.core.thread.ThreadUtil;
import org.noear.redisx.RedisClient;
import org.noear.redisx.plus.RedisBucket;
import org.noear.solon.Solon;

import cn.hg.solon.youcan.job.entity.Job;


/**
 * 定时任务处理（禁止并发执行）
 * 
 * @author ruoyi
 *
 */
public class CronDisallowConcurrentExecution extends AbstractCronJob
{

    /** 所有缓存Key的前缀 */
    private static final String CACHE_KEY_PREFIX = "distributed-job-key:";

    private RedisBucket redisBucket;

    // 锁的过期时间，单位毫秒，默认 1 分钟，
    // 因此，配置的定时任务，每次执行任务的时间间隔，必须大于 1 分钟以上
    private int expire = 60;
    
    private String key;
    
	public CronDisallowConcurrentExecution(Job job) {
		super(job);
		
        Solon.context().getBeanAsync(RedisClient.class, redisClient -> {
            this.redisBucket = redisClient.getBucket();
        });
		
        this.key = CACHE_KEY_PREFIX + job.getId();
	}
	
    @Override
    protected Object doExecute() throws Throwable {
        if (this.redisBucket == null) {
            // 当未配置 redis 的时候，默认使用本地任务的方式进行执行
            return this.invoke();
        }

        Long setTimeMillis = System.currentTimeMillis();

        //要设置的值
        String setValue = setTimeMillis + ":" + IdUtil.fastSimpleUUID();


        boolean locked = false;

        for (int i = 0; i < 5; i++) {
            // result: 设置成功，返回 true，设置失败，返回 false
            Boolean result =
                StrValidator.isNotBlank(this.redisBucket.getOrStore(this.key, this.expire, () -> setValue));

            // success 设置成功
            if (result) {
                String value = this.redisBucket.get(this.key);

                //在分布式的场景下，可能自己设置成功后，又被其他节点删除重新设置的情况
                //需要判断是否是当前节点（或者线程）设置的
                if (setValue.equals(value)) {
                    locked = true;
                    break;
                }

                this.quietlySleep();

                continue;
            }
            if (!result) {

                String value = null;

                try {
                    value = this.redisBucket.get(this.key);
                } catch (Exception ex) {
                    // LogKit.logNothing(ex);
                }


                //获取不到，一般不会出现这种情况，除非是网络异常等原因
                //或者是使用了已经存在的 key，但是此 key 已经有其他序列化方式的值导致异常
                if (value == null) {
                    this.reset();
                    this.quietlySleep();
                    continue;
                }

                String[] split = value.split(":");

                //被其他节点，或者手动操作 redis 的方式给设置了这个key值
                if (split.length != 2) {
                    this.reset();
                    continue;
                }

                //获取设置的时间
                long savedTimeMillis = 0;

                try {
                    savedTimeMillis = Long.parseLong(split[0]);
                } catch (NumberFormatException ex) {
                    // LogKit.logNothing(ex);
                }

                //redis 存储的内容有问题，可能是被手动设置 redis 的方式设置了这个 key 值
                if (savedTimeMillis == 0) {
                    this.reset();
                    continue;
                }

                if ((System.currentTimeMillis() - savedTimeMillis) > this.expire) {
                    //若设置锁的时间以及过期了
                    //说明是上一次任务配置的，此时需要删除这个过期的 key，然后重新去抢
                    this.reset();
                }
                // 若锁没有过期，休息后重新去抢，因为抢走的线程可能会重新释放
                else {
                    this.quietlySleep();
                }
            }

        }

        //抢了5次都抢不到，证明已经被别的应用抢走了
        if (!locked) {
            return null;
        }


        try {
            return this.invoke();
        }

        // 如果 run() 执行异常，让别的分布式应用APP去执行
        // 但如果 run() 执行的时间很长（超过30秒），而且失败了，那么别的分布式应用可能也抢不到了，只能等待下次任务
        // 作用：故障转移
        catch (Throwable ex) {
            this.reset();

            throw ExceptionUtil.getRootCause(ex);
        }
    }

    private Object invoke() throws Throwable {
		return JobInvokeUtil.invokeMethod(this.getJob());
	}

    public void quietlySleep() {
        int millis = 2000;
        if (this.expire <= 2000) {
            millis = 100;
        } else if (this.expire <= 5000) {
            millis = 500;
        } else if (this.expire <= 300000) {
            millis = 1000;
        }

        ThreadUtil.safeSleep(millis);
    }

	/**
     * 重置分布式的 key
     */
    private void reset() {
        try {
            this.redisBucket.remove(this.key);
        } catch (Exception ex) {
            // LogKit.logNothing(ex);
        }
    }
}
